package com.hulu.slow.kafka;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.sankuai.xm.kafka.client.IProducerProcessor;
import com.sankuai.xm.kafka.client.factory.KafkaProducerBuildFactory;
import com.sankuai.xm.kafka.client.utils.FutureCallback;

/**
 * 添加好友时，用于把数据写入kafka，供storm的hulu.recommend服务消费
 * @author likg
 * @date 2016-05-31.
 */
@Component
public class StormProducer {

    private static final Logger log = LoggerFactory.getLogger(StormProducer.class);

 //   private static Counter stormProducerError = MetricBeans.counter("business.StormProducer.error");

    private IProducerProcessor producer;

    @PostConstruct
    public void init() {
        try {
            producer = KafkaProducerBuildFactory.init();
        } catch (Exception e) {
        //    stormProducerError.inc();
            log.error("build kafka StormProducer failed.", e);
        }
    }

    public void sendMessageAsync(final byte[] json, final long id) throws Exception {
        producer.sendAsyncMessage(json, String.valueOf(id), new FutureCallback() {
            @Override
            public void onSuccess(Object asyncProducerResult) {
                log.info("StormProducer send to kafka success.");
            }

            @Override
            public void onFailure(Object msg, Throwable throwable) {
             //   stormProducerError.inc();
                log.error("StormProducer send to kafka error, msg={}", new String(json));
            }
        });
    }

}
